Chaining Datasets¶
Warning
Dependencies are still under development, the current implementation has been added owing to its usefulness, though expect changes in subsequent releases. Every effort will be made to ensure the stability of current methods, however you should always check release notes before upgrading versions if you have an important workflow running with dependencies.
Imagine a scenario where you want to run a heavy calculation which produces a very large output. If you wanted to postprocess those results, you could collect them with fetch_results
and process them locally. This, however, could take time to transfer, and uses up disk space locally. Datasets have a better way of doing this, called “dependencies”.
Dependencies¶
Dependencies allow you to chain jobs together on the remote machine, submitting with different parameters. For example, we can submit a calculation on 50 nodes, and then postprocess the result using only one cpu core.
Currently, dependencies are limited to a linear chain, and only on a one-to-one basis. This means jobs can be joined together in an A -> B -> C sense, and their runners will have a continuous line between them. While expansion is planned, lets look into the current implentation and how it can help our hypothetical scenario.
Lets begin by defining three functions this time:
[2]:
from remotemanager import Dataset, URL
def init(offset):
return offset
def mult(x, y):
offset = loaded
return offset + (x * y)
def post():
return f'The final result is {loaded}'
This workflow does three things:
An “offset” is specified. Think of it as the
c
in ay = mx + c
equation.Two numbers are multiplied together and added to the offset (your
mx
).The result is formatted into a string and returned.
An aside on loaded
¶
loaded
is a property that is added by the dependency network, and it simply allows a function to access the returned value of the function immediately before it in the chain.
Think of it as exactly what is given by the return
of a function:
[3]:
def multi_return():
return 1, 2, 3
def multi_process():
a, b, c = loaded
return a + b + c
The output of the above functions would always be 6.
Creating your chained runners¶
Going back to our original workflow, lets create the datasets. This is done as normal:
[4]:
url = URL() # again, a "local" url for testing purposes
dataset_init = Dataset(function = init,
name = 'init',
url = url,
remote_dir = 'temp_remote',
local_dir = 'temp_local',
mpi = 1,
omp = 1,
nodes = 1,
skip = False)
dataset_calc = Dataset(function = mult,
name = 'calc',
url = url,
remote_dir = 'temp_remote',
local_dir = 'temp_local',
mpi = 64,
omp = 4,
nodes = 50,
skip = False)
dataset_post = Dataset(function = post,
name = 'post',
url = url,
remote_dir = 'temp_remote',
local_dir = 'temp_local',
mpi = 1,
omp = 1,
nodes = 1,
skip = False)
dataset_init.set_downstream(dataset_calc) # new option!
dataset_calc.set_downstream(dataset_post)
The new option here to pay attention to is set_downstream
, though there is also the mirror of this in set_upstream
. These dictate the order in which the datasets will be run.
Doing this creates a global Dependency
object that can be accessed from any of the Datasets:
[5]:
dataset_post.dependency.network
[5]:
[(dataset-init-40221af0, dataset-calc-dce4a953),
(dataset-calc-dce4a953, dataset-post-794d5d4f)]
[6]:
dataset_init.dependency is dataset_calc.dependency is dataset_post.dependency
[6]:
True
Here we can see two “edges” to this network, first a connection between init
and calc
, then a second one between calc
and init
There also exists some extra methods to check whether a dataset has parents or children
[7]:
print('init:')
print('Is parent?', dataset_init.is_parent)
print('Is child?', dataset_init.is_child)
print('\ncalc:')
print('Is parent?', dataset_calc.is_parent)
print('Is child?', dataset_calc.is_child)
init:
Is parent? True
Is child? False
calc:
Is parent? True
Is child? True
You may append runs to any dataset within this chain, and a runner will be created in every dataset. When appending runs, you must include all arguments. Best practice here is to select a Dataset within the chain to append your runs to, and stick to it.
Important
You can avoid “chaining” run_args
through to the other datasets by passing chain_run_args=False
to the run append. That way, any args such as mpi
, omp
, etc. will be passed only to the dataset to which the append was called.
[8]:
dataset_post.append_run(args={'x': 5, 'y': 2, 'offset': 10})
dataset_post.append_run(args={'x': 3, 'y': 7, 'offset': 0})
appended run runner-0
appended run runner-0
appended run runner-0
appended run runner-1
appended run runner-1
appended run runner-1
Now we have appended runs to the main dataset, we can see that runners have been added to both:
[9]:
print(f'initial dataset has {len(dataset_init.runners)} runners')
print(f'calculation dataset has {len(dataset_calc.runners)} runners')
print(f'postprocess dataset has {len(dataset_post.runners)} runners')
initial dataset has 2 runners
calculation dataset has 2 runners
postprocess dataset has 2 runners
Running¶
Datasets can be run as normal, submit from any in the chain to initiate:
[10]:
dataset_calc.run()
Staging Dependency
[0] dataset-init-40221af0... Done, 2/2 Runners staged
[1] dataset-calc-dce4a953... Done, 2/2 Runners staged
[2] dataset-post-794d5d4f... Done, 2/2 Runners staged
Done
Transferring for 8 Runners
Transferring 15 Files... Done
Remotely executing 2 Runners
[10]:
True
[11]:
dataset_post.wait(1, timeout=10)
[12]:
dataset_post.fetch_results()
Fetching results
Transferring 4 Files... Done
[13]:
print(dataset_post.results)
['The final result is 20', 'The final result is 21']
Checking interim results¶
Here, our output looks as we expect, but what do we do if it doesn’t? Well we could first off start by investigating the calculation dataset, as in a real case, it’s likely to be the cause of the problems.
Internally, these datasets are no different to any other, so all their methods work exactly as you’d expect. The only limitations being the ones mentioned previously about appending runners and running parents.
[14]:
dataset_init.fetch_results()
print(dataset_init.results)
Fetching results
Transferring 4 Files... Done
[10, 0]
[15]:
print(dataset_init.states)
[RunnerState("satisfied"), RunnerState("satisfied")]
Note
A final note on branching jobs: At present, jobs having multiple children is supported in a limited sense. So your job tree can expand as you go down the chain, however the opposite is not true. You can not “merge” returned values into a single child.
Envionment Variables¶
You can set environment variables with the extra
keyword arg in datasets (or runners). With dependencies, there’s some more info to note.
Any extra
set at the Dataset level will be applied for that dataset.
So if you add extra="export KEY='VAR'"
to a parent, it will be available for the children, but not in reverse.
[16]:
from remotemanager import RemoteFunction
@RemoteFunction
def search_env(nvars):
import os
found = {}
for i in range(nvars):
varname = f"var{i+1}"
found[varname] = os.environ.get(varname, None)
return found
def parent(nvars):
return search_env(nvars)
def child(nvars):
child_vars = search_env(nvars)
output = {}
for key in child_vars:
output[key] = [loaded[key], child_vars[key]]
return output
ds_1 = Dataset(parent, skip=False, extra="export var1='parentvar'")
ds_2 = Dataset(child, skip=False, extra="export var2='childvar'")
ds_1.set_downstream(ds_2)
ds_1.append_run({"nvars": 4}, extra="export var3='appendvar'")
ds_2.run(extra="export var4='runvar'")
ds_2.wait(1, 10)
ds_2.fetch_results()
ds_2.results
appended run runner-0
appended run runner-0
Staging Dependency
[0] dataset-dataset-b0832d53... Done, 1/1 Runners staged
[1] dataset-dataset-1eaa68f9... Done, 1/1 Runners staged
Done
Transferring for 2 Runners
Transferring 7 Files... Done
Remotely executing 1 Runner
Fetching results
Transferring 2 Files... Done
[16]:
[{'var1': ['parentvar', 'parentvar'],
'var2': [None, 'childvar'],
'var3': ['appendvar', 'appendvar'],
'var4': ['runvar', 'runvar']}]
Lets analyse these results.
var1
was added at the level of the parent runner, so is exported in the first jobscript.
var2
was addeda at the level of the child runner, so is exported in the second jobscript.
Because of this, var1
is available in both the parent and child, whereas var2
is only available in the child.
var3
was added at the append level and is propagated to both appended runners (one for each dataset).
var4
is similar, since it’s set at the run level, it’s temporary, but added to both datasets.